package cn.echcz.twitbase.dao

import cn.echcz.twitbase.domain.Twit
import cn.echcz.twitbase.util.Common._
import cn.echcz.twitbase.util.{HbaseUtils, RowKeyUtils}
import org.apache.hadoop.hbase.client.{Delete, Get, Put, Scan}
import org.apache.hadoop.hbase.util.Bytes

/**
  * 推文工具类
  */
object TwitDao {
  // 推文所在的表
  val TABLE = "twits"
  // 推文拥有的列族
  val CF_INFO = Bytes.toBytes("info")
  // 用户id
  val CQ_USER_ID = Bytes.toBytes("userId")
  // 时间戳
  val CQ_TIMESTAMP = Bytes.toBytes("timestamp")
  // 消息
  val CQ_MSG = Bytes.toBytes("msg")

  // 获取hbase连接，全局
  val twitConn = HbaseUtils.getConnection(Map("hbase.zookeeper.quorum" -> "anode"))
  // 获取推文表，全局
  val twitTable = HbaseUtils.getTable(twitConn, TABLE)

  /**
    * 向hbase存入一个推文
    *
    * @param twit 推文
    */
  def put(twit: Twit): Unit = {
    val row = new Put(twit.id)
    row.addColumn(CF_INFO, CQ_USER_ID, twit.userId)
    row.addColumn(CF_INFO, CQ_TIMESTAMP, Bytes.toBytes(twit.timestamp))
    row.addColumn(CF_INFO, CQ_MSG, twit.msg)
    twitTable.put(row)
  }

  /**
    * 从hbase获取一个推文，通过推文id
    *
    * @param id 推文id
    * @return 推文
    */
  def get(id: Array[Byte]): Twit = {
    val row = new Get(id)
    val result = twitTable.get(row)
    val userId = Bytes.toString(result.getValue(CF_INFO, CQ_USER_ID))
    val timestamp = Bytes.toLong(result.getValue(CF_INFO, CQ_TIMESTAMP))
    val msg = Bytes.toString(result.getValue(CF_INFO, CQ_MSG))
    Twit(id, userId, timestamp, msg)
  }

  /**
    * 从hbase删除一个推文，通过推文id
    *
    * @param id 推文id
    */
  def delete(id: Array[Byte]): Unit = {
    val row = new Delete(id)
    twitTable.delete(row)
  }

  /**
    * 根据用户id扫描用户发送的推文
    * @param userId 用户id
    * @return 用户发送的推文
    */
  def scanByUserId(userId: String): List[Twit] = {
    // 创建开始行与结束行行键
    // 结束于下一个用户
    val startRow = RowKeyUtils.createRowKey(userId, -1 * System.currentTimeMillis())
    val stopRow = Bytes.copy(startRow)
    stopRow(31) = (stopRow(31) + 1).toByte
    // 创建启于开始行，终于结束行的扫描
    val scan = new Scan().withStartRow(startRow).withStopRow(stopRow)
    // 扫描
    val resultScanner = twitTable.getScanner(scan)
    // 遍历
    val iterator = resultScanner.iterator()
    var list = List[Twit]()
    while (iterator.hasNext) {
      val row = iterator.next
      val id = row.getRow
      val userId = row.getValue(CF_INFO, CQ_USER_ID)
      val time = row.getValue(CF_INFO, CQ_TIMESTAMP)
      val msg = row.getValue(CF_INFO, CQ_MSG)
      list = Twit(id, Bytes.toString(userId), Bytes.toLong(time), Bytes.toString(msg)) :: list
    }
    // 返回list的反转(因为存在时添加到的是list的头)
    list.reverse
  }

}
